package org.luosl.webmagicx.scheduler

import java.io._
import java.util
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.locks.ReentrantLock

import com.alibaba.fastjson.JSON
import org.apache.commons.io.FileUtils
import org.luosl.webmagicx.conf.MatcherConverters._
import org.apache.http.annotation.GuardedBy
import org.luosl.webmagicx.conf.PropType.strType
import org.luosl.webmagicx.conf.{SpiderConf, XmlProps}
import org.luosl.webmagicx.utils.ResourceUtils._
import us.codecraft.webmagic.scheduler.component.DuplicateRemover
import us.codecraft.webmagic.utils.JsonUtil
import us.codecraft.webmagic.{Request, Task}

import scala.io.{Codec, Source}


/**
  * 基于文件的优先级调度器
  * Created by luosl on 2018/3/19.
  */
class PriorityFileScheduler(sc:SpiderConf, task:Task, props:XmlProps) extends AbstractScheduler(sc, task, props) with DuplicateRemover with Closeable{

  private val threadScheduledExecutor:ScheduledExecutorService = Executors.newSingleThreadScheduledExecutor()

  private val priorityQueue = new PriorityBlockingQueue[Request](10, requestComparator)

  private val duplicateLock:ReentrantLock = new ReentrantLock()

  private val pollLock:ReentrantLock = new ReentrantLock()

  private val pushLock:ReentrantLock = new ReentrantLock()

  private val urls:util.HashSet[String] = new util.HashSet[String]()

  private val cursor:AtomicInteger = new AtomicInteger(0)

  /**
    * 数据存放路径
    */
  var dataDir:String = _

  var queueWriter:BufferedWriter = _
  var cursorWriter:BufferedWriter = _

  // 设置 DuplicateRemover
  this.setDuplicateRemover(this)
  // 初始化
  init()

  def init(): Unit ={
    initDataDir()
    initQueueAndCursor()
    initWriter()
    initFlushTask()
    logInfo("初始化 PriorityFileScheduler 组件成功")
  }

  /**
    * 初始化 队列 和 游标
    */
  def initQueueAndCursor(): Unit ={
    priorityQueue.clear()
    urls.clear()
    // 获取最后一个有效的 cursor
    val cursorFile:File = new File(cursorPath)
    val lastCursor:Int = if(cursorFile.exists()){
      withClose(Source.fromFile(cursorPath)(Codec.UTF8)){ source=>
        cursor.set(source.getLines().fold("1")( (_, a2)=> a2 ).toInt)
        cursor.get()
      }
    }else 0
    // 加载 cursor 之后的数据
    val queueFile:File = new File(queuePath)
    if(queueFile.exists()){
      withClose(Source.fromFile(queueFile)(Codec.UTF8)){ source =>
        source.getLines().foldLeft(0){ (idx,line) =>
          val req:Request = JSON.parseObject(line,classOf[Request])
          urls.add(req.getUrl)
          if(idx >= lastCursor - 1){
            priorityQueue.add(req)
          }
          idx + 1
        }
      }
    }
  }

  /**
    * 初始化 DataDir
    */
  def initDataDir(): Unit ={
    val path:String = props.valueOrDefault("dataDir", "value")(strType)("")
    val f:File = new File(path)
    if(f.isFile) throw new RuntimeException(s"${f.getAbsolutePath} is a file!!")
    if(!f.exists()){
      FileUtils.forceMkdir(f)
      logInfo(s"make dir: ${f.getAbsolutePath}")
    }
    dataDir = f.getAbsolutePath
  }

  /**
    * 初始化 Writer
    */
  def initWriter(): Unit ={
    queueWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(queuePath, true), "UTF-8"))
    cursorWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(cursorPath, true), "UTF-8"))
  }

  /**
    * 初始化 flush 任务
    */
  def initFlushTask(): Unit ={
    threadScheduledExecutor.scheduleWithFixedDelay(
      ()=> {
        queueWriter.flush()
        cursorWriter.flush()
      }, 0, 5, TimeUnit.SECONDS )
  }

  /**
    * 队列路径
    * @return
    */
  def queuePath:String = s"$dataDir${File.separator}${task.getUUID}.queue.txt"

  /**
    * 游标路径
    * @return
    */
  def cursorPath:String = s"$dataDir${File.separator}${task.getUUID}.cursor.txt"

  override def getLeftRequestsCount(task: Task): Int = priorityQueue.size()

  override def getTotalRequestsCount(task: Task): Int = urls.size()

  /**
    * get an url to crawl
    *
    * @param task the task of spider
    * @return the url to crawl
    */
  @GuardedBy("pollLock")
  override def poll(task: Task): Request = withLock(pollLock){ _ =>
    val req:Request = priorityQueue.poll()
    if(null != req) {
      cursorWriter.write(s"${cursor.incrementAndGet()}")
      cursorWriter.newLine()
    }
    req
  }

  @GuardedBy("pushLock")
  override protected def pushWhenNoDuplicate(request: Request, task: Task): Unit = withLock(pushLock){ _ =>
    priorityQueue.put(request)
    queueWriter.write(JsonUtil.toJSONString(request))
    queueWriter.newLine()
  }
  /**
    *
    * Check whether the request is duplicate.
    *
    * @param request request
    * @param task    task
    * @return true if is duplicate
    */
  @GuardedBy("duplicateLock")
  override def isDuplicate(request: Request, task: Task): Boolean = withLock(duplicateLock)( _ => !urls.add(request.getUrl) )


  /**
    * Reset duplicate check.
    *
    * @param task task
    */
  override def resetDuplicateCheck(task: Task): Unit = {
    urls.clear()
  }

  def clean(): Unit ={
    priorityQueue.clear()
    urls.clear()
    FileUtils.forceDelete(new File(queuePath))
    FileUtils.forceDelete(new File(cursorPath))
  }

  override def close(): Unit = {
    this.threadScheduledExecutor.awaitTermination(10, TimeUnit.SECONDS)
    queueWriter.close()
    cursorWriter.close()
    clean()
  }
}
